# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import mock
import unittest

from pyarrow.filesystem import LocalFileSystem, S3FSWrapper
from pyarrow.lib import ArrowIOError
from six.moves.urllib.parse import urlparse

from petastorm.hdfs.tests.test_hdfs_namenode import HC, MockHadoopConfiguration, \
  MockHdfs, MockHdfsConnector

from pycarbon.core.carbon_fs_utils import CarbonFilesystemResolver
from pycarbon.tests import access_key, secret_key, endpoint

ABS_PATH = '/abs/path'


class FilesystemResolverTest(unittest.TestCase):
  """
  Checks the full filesystem resolution functionality, exercising each URL interpretation case.
  """

  @classmethod
  def setUpClass(cls):
    cls.mock = MockHdfsConnector()

  def setUp(self):
    """Initializes a mock hadoop config and populate with basic properties."""
    # Reset counters in mock connector
    self.mock.reset()
    self._hadoop_configuration = MockHadoopConfiguration()
    self._hadoop_configuration.set('fs.defaultFS', HC.FS_WARP_TURTLE)
    self._hadoop_configuration.set('dfs.ha.namenodes.{}'.format(HC.WARP_TURTLE), 'nn2,nn1')
    self._hadoop_configuration.set('dfs.namenode.rpc-address.{}.nn1'.format(HC.WARP_TURTLE), HC.WARP_TURTLE_NN1)
    self._hadoop_configuration.set('dfs.namenode.rpc-address.{}.nn2'.format(HC.WARP_TURTLE), HC.WARP_TURTLE_NN2)

  def test_error_url_cases(self):
    """Various error cases that result in exception raised."""
    # Case 1: Schemeless path asserts
    with self.assertRaises(ValueError):
      CarbonFilesystemResolver(ABS_PATH, {})

    # Case 4b: HDFS default path case with NO defaultFS
    with self.assertRaises(RuntimeError):
      CarbonFilesystemResolver('hdfs:///some/path', {})

    # Case 4b: Using `default` as host, while apparently a pyarrow convention, is NOT valid
    with self.assertRaises(ArrowIOError):
      CarbonFilesystemResolver('hdfs://default', {})

    # Case 5: other schemes result in ValueError; urlparse to cover an else branch!
    with self.assertRaises(ValueError):
      CarbonFilesystemResolver(urlparse('http://foo/bar'), {})
    with self.assertRaises(ValueError):
      CarbonFilesystemResolver(urlparse('ftp://foo/bar'), {})
    with self.assertRaises(ValueError):
      CarbonFilesystemResolver(urlparse('ssh://foo/bar'), {})

    # s3 paths must have the bucket as the netloc
    with self.assertRaises(ValueError):
      CarbonFilesystemResolver(urlparse('s3:///foo/bar'), {})

  def test_file_url(self):
    """ Case 2: File path, agnostic to content of hadoop configuration."""
    suj = CarbonFilesystemResolver('file://{}'.format(ABS_PATH), hadoop_configuration=self._hadoop_configuration,
                                   connector=self.mock)
    self.assertTrue(isinstance(suj.filesystem(), LocalFileSystem))
    self.assertEqual('', suj.parsed_dataset_url().netloc)
    self.assertEqual(ABS_PATH, suj.get_dataset_path())

  def test_hdfs_url_with_nameservice(self):
    """ Case 3a: HDFS nameservice."""
    suj = CarbonFilesystemResolver(dataset_url=HC.WARP_TURTLE_PATH, hadoop_configuration=self._hadoop_configuration,
                                   connector=self.mock)
    self.assertEqual(MockHdfs, type(suj.filesystem()._hdfs))
    self.assertEqual(HC.WARP_TURTLE, suj.parsed_dataset_url().netloc)
    self.assertEqual(1, self.mock.connect_attempted(HC.WARP_TURTLE_NN2))
    self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1))
    self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN))

  def test_hdfs_url_no_nameservice(self):
    """ Case 3b: HDFS with no nameservice should connect to default namenode."""
    suj = CarbonFilesystemResolver(dataset_url='hdfs:///some/path', hadoop_configuration=self._hadoop_configuration,
                                   connector=self.mock)
    self.assertEqual(MockHdfs, type(suj.filesystem()._hdfs))
    self.assertEqual(HC.WARP_TURTLE, suj.parsed_dataset_url().netloc)
    # ensure path is preserved in parsed URL
    self.assertEqual('/some/path', suj.get_dataset_path())
    self.assertEqual(1, self.mock.connect_attempted(HC.WARP_TURTLE_NN2))
    self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1))
    self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN))

  def test_hdfs_url_direct_namenode(self):
    """ Case 4: direct namenode."""
    suj = CarbonFilesystemResolver('hdfs://{}/path'.format(HC.WARP_TURTLE_NN1),
                                   hadoop_configuration=self._hadoop_configuration,
                                   connector=self.mock)
    self.assertEqual(MockHdfs, type(suj.filesystem()))
    self.assertEqual(HC.WARP_TURTLE_NN1, suj.parsed_dataset_url().netloc)
    self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN2))
    self.assertEqual(1, self.mock.connect_attempted(HC.WARP_TURTLE_NN1))
    self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN))

  def test_hdfs_url_direct_namenode_retries(self):
    """ Case 4: direct namenode fails first two times thru, but 2nd retry succeeds."""
    self.mock.set_fail_n_next_connect(2)
    with self.assertRaises(ArrowIOError):
      suj = CarbonFilesystemResolver('hdfs://{}/path'.format(HC.WARP_TURTLE_NN2),
                                     hadoop_configuration=self._hadoop_configuration,
                                     connector=self.mock)
    self.assertEqual(1, self.mock.connect_attempted(HC.WARP_TURTLE_NN2))
    self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1))
    self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN))
    with self.assertRaises(ArrowIOError):
      suj = CarbonFilesystemResolver('hdfs://{}/path'.format(HC.WARP_TURTLE_NN2),
                                     hadoop_configuration=self._hadoop_configuration,
                                     connector=self.mock)
    self.assertEqual(2, self.mock.connect_attempted(HC.WARP_TURTLE_NN2))
    self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1))
    self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN))
    # this one should connect "successfully"
    suj = CarbonFilesystemResolver('hdfs://{}/path'.format(HC.WARP_TURTLE_NN2),
                                   hadoop_configuration=self._hadoop_configuration,
                                   connector=self.mock)
    self.assertEqual(MockHdfs, type(suj.filesystem()))
    self.assertEqual(HC.WARP_TURTLE_NN2, suj.parsed_dataset_url().netloc)
    self.assertEqual(3, self.mock.connect_attempted(HC.WARP_TURTLE_NN2))
    self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1))
    self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN))

  def test_s3_without_s3fs(self):
    with mock.patch.dict('sys.modules', s3fs=None):
      # `import s3fs` will fail in this context
      with self.assertRaises(ValueError):
        CarbonFilesystemResolver(urlparse('s3a://foo/bar'), {})

  def test_s3_url(self):
    suj = CarbonFilesystemResolver('s3a://bucket{}'.format(ABS_PATH),
                                   key=access_key,
                                   secret=secret_key,
                                   endpoint=endpoint,
                                   hadoop_configuration=self._hadoop_configuration, connector=self.mock)
    self.assertTrue(isinstance(suj.filesystem(), S3FSWrapper))
    self.assertEqual('bucket', suj.parsed_dataset_url().netloc)
    self.assertEqual('bucket' + ABS_PATH, suj.get_dataset_path())


if __name__ == '__main__':
  unittest.main()
